package com.uber.hoodie.utilities;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.uber.hoodie.CompactionAdminClient;
import com.uber.hoodie.CompactionAdminClient.RenameOpResult;
import com.uber.hoodie.CompactionAdminClient.ValidationOpResult;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;

public class HoodieCompactionAdminTool {

  private final Config cfg;

  public HoodieCompactionAdminTool(Config cfg) {
    this.cfg = cfg;
  }

  /**
   *
   * @param args
   * @throws Exception
   */
  public static void main(String[] args) throws Exception {
    final Config cfg = new Config();
    JCommander cmd = new JCommander(cfg, args);
    if (cfg.help || args.length == 0) {
      cmd.usage();
      System.exit(1);
    }
    HoodieCompactionAdminTool admin = new HoodieCompactionAdminTool(cfg);
    admin.run(UtilHelpers.buildSparkContext("admin-compactor", cfg.sparkMaster, cfg.sparkMemory));
  }

  /**
   * Executes one of compaction admin operations
   */
  public void run(JavaSparkContext jsc) throws Exception {
    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath);
    CompactionAdminClient admin = new CompactionAdminClient(jsc, cfg.basePath);
    final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
    if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) {
      throw new IllegalStateException("Output File Path already exists");
    }
    switch (cfg.operation) {
      case VALIDATE:
        List<ValidationOpResult> res =
            admin.validateCompactionPlan(metaClient, cfg.compactionInstantTime, cfg.parallelism);
        if (cfg.printOutput) {
          printOperationResult("Result of Validation Operation :", res);
        }
        serializeOperationResult(fs, res);
        break;
      case UNSCHEDULE_FILE:
        List<RenameOpResult> r =
            admin.unscheduleCompactionFileId(cfg.fileId, cfg.skipValidation, cfg.dryRun);
        if (cfg.printOutput) {
          System.out.println(r);
        }
        serializeOperationResult(fs, r);
        break;
      case UNSCHEDULE_PLAN:
        List<RenameOpResult> r2 =
            admin.unscheduleCompactionPlan(cfg.compactionInstantTime, cfg.skipValidation, cfg.parallelism, cfg.dryRun);
        if (cfg.printOutput) {
          printOperationResult("Result of Unscheduling Compaction Plan :", r2);
        }
        serializeOperationResult(fs, r2);
        break;
      case REPAIR:
        List<RenameOpResult> r3 =
            admin.repairCompaction(cfg.compactionInstantTime, cfg.parallelism, cfg.dryRun);
        if (cfg.printOutput) {
          printOperationResult("Result of Repair Operation :", r3);
        }
        serializeOperationResult(fs, r3);
        break;
      default:
        throw new IllegalStateException("Not yet implemented !!");
    }
  }

  private <T> void serializeOperationResult(FileSystem fs, T result) throws Exception {
    if ((cfg.outputPath != null) && (result != null)) {
      Path outputPath = new Path(cfg.outputPath);
      FSDataOutputStream fsout = fs.create(outputPath, true);
      ObjectOutputStream out = new ObjectOutputStream(fsout);
      out.writeObject(result);
      out.close();
      fsout.close();
    }
  }

  /**
   * Print Operation Result
   *
   * @param initialLine Initial Line
   * @param result      Result
   */
  private <T> void printOperationResult(String initialLine, List<T> result) {
    System.out.println(initialLine);
    for (T r : result) {
      System.out.print(r);
    }
  }

  /**
   * Operation Types
   */
  public enum Operation {
    VALIDATE,
    UNSCHEDULE_PLAN,
    UNSCHEDULE_FILE,
    REPAIR
  }

  /**
   * Admin Configuration Options
   */
  public static class Config implements Serializable {

    @Parameter(names = {"--operation", "-op"}, description = "Operation", required = true)
    public Operation operation = Operation.VALIDATE;
    @Parameter(names = {"--base-path", "-bp"}, description = "Base path for the dataset", required = true)
    public String basePath = null;
    @Parameter(names = {"--instant-time", "-in"}, description = "Compaction Instant time", required = false)
    public String compactionInstantTime = null;
    @Parameter(names = {"--file-id", "-id"}, description = "File Id", required = false)
    public String fileId = null;
    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false)
    public int parallelism = 3;
    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = true)
    public String sparkMaster = null;
    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
    public String sparkMemory = null;
    @Parameter(names = {"--dry-run", "-dr"}, description = "Dry Run Mode", required = false)
    public boolean dryRun = false;
    @Parameter(names = {"--skip-validation", "-sv"}, description = "Skip Validation", required = false)
    public boolean skipValidation = false;
    @Parameter(names = {"--output-path", "-ot"}, description = "Output Path", required = false)
    public String outputPath = null;
    @Parameter(names = {"--print-output", "-pt"}, description = "Print Output", required = false)
    public boolean printOutput = true;
    @Parameter(names = {"--help", "-h"}, help = true)
    public Boolean help = false;
  }
}
